Source code for hysop.backend.device.opencl.operator.external_force

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import primefac, functools
import sympy as sm

from hysop.tools.numerics import float_to_complex_dtype
from hysop.tools.numpywrappers import npw
from hysop.tools.decorators import debug
from hysop.tools.warning import HysopWarning
from hysop.tools.htypes import first_not_None, to_tuple, check_instance
from hysop.core.graph.graph import op_apply
from hysop.core.memory.memory_request import MemoryRequest
from hysop.fields.continuous_field import Field, ScalarField
from hysop.parameters.tensor_parameter import TensorParameter
from hysop.parameters.scalar_parameter import ScalarParameter
from hysop.backend.device.opencl.opencl_fft import OpenClFFT
from hysop.backend.device.opencl.opencl_symbolic import OpenClSymbolic
from hysop.backend.device.codegen.base.variables import dtype_to_ctype
from hysop.symbolic import local_indices_symbols
from hysop.symbolic.relational import Assignment
from hysop.symbolic.complex import ComplexMul
from hysop.symbolic.misc import Select
from hysop.symbolic.field import AppliedSymbolicField, SymbolicField, curl, laplacian
from hysop.symbolic.parameter import SymbolicScalarParameter
from hysop.symbolic.spectral import AppliedSpectralTransform
from hysop.operator.base.external_force import (
    ExternalForce,
    SpectralExternalForceOperatorBase,
)


[docs] class OpenClSpectralExternalForce(SpectralExternalForceOperatorBase, OpenClSymbolic): """ Operator to compute the curl of a symbolic expression. """ @debug def __new__(cls, Fext, **kwds): return super().__new__(cls, Fext=Fext, **kwds) @debug def __init__(self, Fext, **kwds): check_instance(Fext, SymbolicExternalForce) super().__init__(Fext=Fext, **kwds)
[docs] class SymbolicExternalForce(ExternalForce): @debug def __new__(cls, name, Fext, diffusion=None, **kwds): return super().__new__(cls, name=name, dim=None, Fext=Fext, **kwds) @debug def __init__(self, name, Fext, diffusion=None, **kwds): """ Specify an external force as a tuple of symbolic expressions. 2D ExternalForce example: 1) Fext = -rho*g*e_y where rho is a field and g a constant Fext = (0, -rho.s()*g) 2) Fext = (Rs*S+C)*e_y Fext = (0, -Rs*S.s()+C.s()) """ Fext = to_tuple(Fext) dim = len(Fext) Fext = list(Fext) for i, e in enumerate(Fext): if isinstance(e, type(None)): Fext[i] = 0 # curl(const) = 0 msg = ( 'Expression "{}" contains a SymbolicField, did you forget to apply it ?' ) msg = msg.format(e) if isinstance(e, sm.Basic): assert not e.atoms(SymbolicField), msg Fext = tuple(Fext) super().__init__(name=name, dim=dim, Fext=Fext, **kwds) diffusion = first_not_None(diffusion, {}) diffusion = {k: v for (k, v) in diffusion.items() if (v is not None)} for k, v in diffusion.items(): assert k in self.input_fields(), k.short_description() assert isinstance(v, ScalarParameter) self._diffusion = diffusion
[docs] def input_fields(self): return set(map(lambda f: f.field, self._extract_objects(AppliedSymbolicField)))
[docs] def output_fields(self): return set(self._diffusion.keys())
[docs] def input_params(self): p0 = set( map(lambda p: p.parameter, self._extract_objects(SymbolicScalarParameter)) ) p1 = set(self._diffusion.values()) return p0.union(p1)
[docs] def output_params(self): return set()
[docs] def initialize(self, op): tg = op.new_transform_group() fft_fields = tuple(self.input_fields()) forward_transforms = {} backward_transforms = {} for Si in fft_fields: Fi = tg.require_forward_transform(Si) forward_transforms[Si] = Fi if Si in self.diffusion: Bi = tg.require_backward_transform(Si) backward_transforms[Si] = Bi force_backward_transforms = {} for Fi in op.force.fields: force_backward_transforms[Fi] = tg.require_backward_transform( Fi, custom_input_buffer="B0" ) frame = None fft_expressions = () for e in self.Fext: if isinstance(e, sm.Basic): efields = tuple(e.atoms(AppliedSymbolicField)) for sf in efields: field = sf.field assert field in forward_transforms, field.name if field in self._diffusion: assert field in backward_transforms, field.name replace = {sf: forward_transforms[sf.field].s for sf in efields} frame = next(iter(replace.values)).frame e = e.xreplace(replace) fft_expressions += (e,) if frame is None: msg = "Could not extract frame from expressions." raise RuntimeError(frame) fft_expressions = to_tuple(curl(fft_expressions, frame)) self.tg = tg self.forward_transforms = forward_transforms self.backward_transforms = backward_transforms self.force_backward_transforms = force_backward_transforms self.fft_expressions = fft_expressions self.compute_required_kernels(op)
[docs] def compute_required_kernels(self, op): dts = op.dt.s forces = op.force.s() diffusion_kernels = {} for f, nu in self.diffusion.items(): nus = nu.s kname = f"filter_diffusion_{f.dim}d_{f.name}" Ft = self.forward_transforms[f] Fs = Ft.output_symbolic_array(f"{f.name}_hat") E = 0 Wn = self.tg.push_expressions(laplacian(Ft.s, Ft.s.frame)) for Wi in Wn: Wi = self.tg._indexed_wave_numbers[Wi] E += Wi expr = Assignment(Fs, Fs / (1 - nus * dts * E)) op.require_symbolic_kernel(kname, expr) diffusion_kernels[f] = kname force_kernels = () vorticity_kernels = () assert ( len(op.vorticity.fields) == len(op.force.fields) == len(self.fft_expressions) ) for Fi, Wi, e in zip( op.force.fields, op.vorticity.fields, self.fft_expressions ): if e == 0: force_kernels += (None,) vorticity_kernels += (None,) continue Fi_hat = self.force_backward_transforms[Fi] Fi_buf = Fi_hat.input_symbolic_array(f"{Fi.name}_hat") Wn = self.tg.push_expressions(Assignment(Fi_hat, e)) msg = "Could not extract transforms." try: transforms = e.atoms(AppliedSpectralTransform) except AttributeError: raise RuntimeError(msg) assert len(transforms) >= 1, msg fft_buffers = { Ft.s: Ft.output_symbolic_array(f"{Ft.field.name}_hat") for Ft in self.forward_transforms.values() } wavenumbers = {Wi: self.tg._indexed_wave_numbers[Wi] for Wi in Wn} replace = {} replace.update(fft_buffers) replace.update(wavenumbers) expr = e.xreplace(replace) expr = Assignment(Fi_buf, expr) kname = f"compute_{Fi.var_name}" op.require_symbolic_kernel(kname, expr) force_kernels += (kname,) Fis = Fi.s() Wis = Wi.s() expr = Assignment(Wis, Wis + dts * Fis) kname = f"update_{Wi.var_name}" op.require_symbolic_kernel(kname, expr) vorticity_kernels += (kname,) assert len(diffusion_kernels) == len(self.diffusion) assert ( len(force_kernels) == op.vorticity.nb_components == len(vorticity_kernels) ) self.diffusion_kernel_names = diffusion_kernels self.force_kernel_names = force_kernels self.vorticity_kernel_names = vorticity_kernels
[docs] def discretize(self, op): pass
[docs] def get_mem_requests(self, op): requests = {} for Fi in self.forward_transforms.keys(): Ft = self.forward_transforms[Fi] Bt = self.backward_transforms.get(Fi, None) if Bt is not None: assert Ft.backend is Bt.backend assert Ft.output_dtype == Bt.input_dtype, ( Ft.output_dtype, Bt.input_dtype, ) assert Ft.output_shape == Bt.input_shape, ( Ft.output_shape, Bt.input_shape, ) shape = Ft.output_shape dtype = Ft.output_dtype request = MemoryRequest( backend=self.tg.backend, dtype=dtype, shape=shape, nb_components=1, alignment=op.min_fft_alignment, ) name = f"{Ft.field.name}_hat" requests[name] = request return requests
[docs] def pre_setup(self, op, work): for Fi in self.forward_transforms.keys(): Ft = self.forward_transforms[Fi] Bt = self.backward_transforms.get(Fi, None) (dtmp,) = work.get_buffer(op, f"{Ft.field.name}_hat") Ft.configure_output_buffer(dtmp) if Bt is not None: Bt.configure_input_buffer(dtmp)
[docs] def post_setup(self, op, work): diffusion_kernels = {} force_kernels = {} compute_statistics = {} vorticity_kernels = {} ghost_exchangers = {} queue = self.tg.backend.cl_env.default_queue def build_launcher(knl, update_params): def kernel_launcher(knl=knl, update_params=update_params, queue=queue): kwds = update_params() return knl(queue=queue, **kwds) return kernel_launcher for field, kname in self.diffusion_kernel_names.items(): dfield = op.get_input_discrete_field(field) knl, update_params = op.symbolic_kernels[kname] diffusion_kernels[field] = build_launcher(knl, update_params) ghost_exchangers[field] = functools.partial( dfield.build_ghost_exchanger(), queue=queue ) if op.Fmin is not None: min_values = npw.asarray(op.Fmin()).copy() if op.Fmax is not None: max_values = npw.asarray(op.Fmax()).copy() for i, (kname0, kname1) in enumerate( zip(self.force_kernel_names, self.vorticity_kernel_names) ): if kname0 is None: assert kname1 is None continue Wi = op.vorticity.fields[i] Fi = op.force.fields[i] dWi = op.dW.dfields[i] dFi = op.dF.dfields[i] knl, update_params = op.symbolic_kernels[kname0] force_kernels[(Fi, Wi)] = build_launcher(knl, update_params) knl, update_params = op.symbolic_kernels[kname1] vorticity_kernels[(Fi, Wi)] = build_launcher(knl, update_params) ghost_exchangers[Wi] = functools.partial( dWi.build_ghost_exchanger(), queue=queue ) def compute_statistic( op=op, queue=queue, dFi=dFi, min_values=min_values, max_values=max_values, ): if op.Fmin is not None: min_values[i] = dFi.sdata.min(queue=queue).get() if op.Fmax is not None: max_values[i] = dFi.sdata.max(queue=queue).get() compute_statistics[Fi] = compute_statistic def update_statistics(op=op, min_values=min_values, max_values=max_values): if op.Fmin is not None: op.Fmin.value = min_values if op.Fmax is not None: op.Fmax.value = max_values if op.Finf is not None: op.Finf.value = npw.maximum(npw.abs(min_values), npw.abs(max_values)) assert ( len(diffusion_kernels) == len(self.diffusion) == len(self.backward_transforms) ) assert ( len(vorticity_kernels) == len(force_kernels) == len(self.force_backward_transforms) ) assert len(ghost_exchangers) == len(diffusion_kernels) + len(vorticity_kernels) self.diffusion_kernels = diffusion_kernels self.force_kernels = force_kernels self.vorticity_kernels = vorticity_kernels self.ghost_exchangers = ghost_exchangers self.compute_statistics = compute_statistics self.update_statistics = update_statistics
@op_apply def apply(self, op, **kwds): for field, Ft in self.forward_transforms.items(): evt = Ft() if field in self.backward_transforms: evt = self.diffusion_kernels[field]() evt = self.backward_transforms[field]() evt = self.ghost_exchangers[field]() for Fi, Wi in self.force_kernels.keys(): evt = self.force_kernels[(Fi, Wi)]() evt = self.force_backward_transforms[Fi]() evt = self.compute_statistics[Fi]() evt = self.vorticity_kernels[(Fi, Wi)]() evt = self.ghost_exchangers[Wi]() self.update_statistics() def _extract_objects(self, obj_type): objs = set() for e in self.Fext: try: objs.update(e.atoms(obj_type)) except AttributeError: pass return objs
[docs] def short_description(self): return f"SymbolicExternalForce[name={self.name}]"
[docs] def long_description(self): sep = "\n *" expressions = sep + sep.join(f"F{x} = {e}" for (x, e) in zip("xyz", self.Fext)) diffusion = sep + sep.join( f"{f.pretty_name}: {p.pretty_name}" for (f, p) in self.diffusion.items() ) input_fields = ", ".join(f.pretty_name for f in self.input_fields()) output_fields = ", ".join(f.pretty_name for f in self.output_fields()) input_params = ", ".join(p.pretty_name for p in self.input_params()) output_params = ", ".join(p.pretty_name for p in self.output_params()) ss = """SymbolicExternalForce: name: {} pretty_name: {} expressions: {} diffusion: {} ----------------- input_fields: {} output_fields: {} input_params: {} output_params: {} """.format( self.name, self.pretty_name, expressions, diffusion, input_fields, output_fields, input_params, output_params, ) return ss